Skip to content

feat: response body mode framework for streaming support#169

Open
noyitz wants to merge 6 commits into
llm-d:mainfrom
noyitz:feat/response-body-mode-framework
Open

feat: response body mode framework for streaming support#169
noyitz wants to merge 6 commits into
llm-d:mainfrom
noyitz:feat/response-body-mode-framework

Conversation

@noyitz

@noyitz noyitz commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

Summary

  • Add BodyProcessingMode type (Skip, Chunks, Full) and ResponseBodyRequirement interface for response plugins to declare their body access needs
  • Pre-compute NeedsResponseBuffering per profile at startup based on plugin declarations
  • When NeedsResponseBuffering=false for the selected profile, ack each response body chunk immediately — enabling real-time client streaming
  • Plugins that don't implement the interface default to Full (backward compatible)

Body Processing Modes

Mode Behavior Use case
Skip Plugin doesn't need response body Headers-only plugins
Chunks Plugin processes chunks as they stream (must implement ChunkProcessor) Metering, logging
Full Framework buffers entire response before calling plugin API translation, body mutation

How it works

At startup, the config loader iterates each profile's response plugins and checks their BodyProcessingMode() declaration:

  • If ANY plugin declares FullNeedsResponseBuffering=true for that profile
  • Otherwise → NeedsResponseBuffering=false, chunks are acked immediately

This is a per-profile decision, so different profiles can have different buffering behavior.

Test plan

  • Unit tests for computeResponseBuffering covering all mode combinations
  • Tests for validation: Chunks without ChunkProcessor implementation fails at startup
  • Multi-profile test verifying independent buffering decisions

@github-actions github-actions Bot added size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. labels Jun 15, 2026
@github-actions

Copy link
Copy Markdown

⚠️ Large PR detected

Your PR is large. Please consider breaking it into multiple PRs.

The do-not-merge/hold label has been added and can be removed by the reviewers based on their judgement.

@github-actions

Copy link
Copy Markdown

⚠️ Large PR detected

Your PR is large. Please consider breaking it into multiple PRs.

The do-not-merge/hold label has been added and can be removed by the reviewers based on their judgement.

noyitz added a commit to noyitz/llm-d-inference-payload-processor that referenced this pull request Jun 17, 2026
Add ChunkProcessor interface that plugins declaring BodyChunked must
implement. The framework validates this at startup and pre-computes
the list of ChunkProcessors per profile. When streaming (no buffering),
each response body chunk is passed through all ChunkProcessors before
being acked to Envoy, enabling in-flight chunk transformation.

Depends on llm-d#169

Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
@noyitz noyitz force-pushed the feat/response-body-mode-framework branch from bf93834 to 381fe43 Compare June 17, 2026 16:11
@github-actions

Copy link
Copy Markdown

⚠️ Large PR detected

Your PR is large. Please consider breaking it into multiple PRs.

The do-not-merge/hold label has been added and can be removed by the reviewers based on their judgement.

@nirrozenbaum nirrozenbaum removed the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Jun 19, 2026
Comment thread docs/proposals/044-plugin-body-mode-capabilities/README.md Outdated
Comment thread pkg/config/loader/configloader.go Outdated
Comment thread docs/proposals/044-plugin-body-mode-capabilities/README.md Outdated
Comment thread docs/proposals/044-plugin-body-mode-capabilities/README.md Outdated
…cessing

Split response processing into two interfaces per review feedback:

- ResponseProcessor: processes the complete buffered response body (existing)
- ResponseChunkProcessor: processes individual chunks without buffering (new)

The Profile now carries both processor lists and a NeedsResponseBuffering
flag. The config loader sorts response plugins into the right list based
on which interface they implement. A plugin can implement both.

When ResponseProcessor plugins are present → buffer the full response.
When only ResponseChunkProcessors are present → stream chunks through.
When neither is present → buffer (backward compatible).

Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
@noyitz noyitz force-pushed the feat/response-body-mode-framework branch from 381fe43 to 5f2053c Compare June 22, 2026 12:36
@github-actions github-actions Bot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Jun 22, 2026
Comment thread pkg/handlers/server.go Outdated
Comment thread pkg/config/loader/configloader.go Outdated
Comment thread pkg/handlers/server.go Outdated
Comment thread pkg/handlers/response.go Outdated
…alidate no mixing

- Simplify server.go buffering check to Profile.NeedsResponseBuffering (comment 1)
- Remove matched variable, use continue pattern in config loader (comment 2)
- Move chunk processing from server.go to HandleResponseChunk in response.go (comment 3)
- Add validation: reject profiles that mix ResponseProcessor and
  ResponseChunkProcessor plugins (comment 4)
- HandleResponseChunk runs chunk processors with metrics and logging,
  following the same pattern as HandleResponseBody
- Update streaming test to set NeedsResponseBuffering for buffered path

Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
wantFullBody := []byte(`{"choices":[{"text":"Hello!"}]}`)

profiles := newTestProfiles()
profiles[testProfileName].NeedsResponseBuffering = true

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add in a follow up unit tests to cover the new funcionality?

  • mix of body response plugins + chunk response fails.
  • setting profile with chunks only perform streaming.
  • setting profile with full only is buffering.
  • setting profile with no plugins performs streaming.

not a blocker for this PR (can be pushed in follow up).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — will add in a follow-up PR.

Comment thread pkg/handlers/server.go
metrics.RecordRequestTTFT(model, reqCtx.ResponseFirstChunkTimestamp.Sub(reqCtx.RequestReceivedTimestamp))
responses, err = s.HandleResponseBody(ctx, reqCtx, responseBody)
loggerVerbose.Info("processing response body complete")
} else {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now this looks nice and clean :)

Comment thread pkg/handlers/server.go Outdated
Comment thread pkg/framework/interface/requesthandling/plugins.go Outdated
Comment thread pkg/handlers/response.go
noyitz added 2 commits June 25, 2026 10:53
- Move TTFT metric recording after if/else so it applies to both
  buffered and streaming paths (comment 3+4)
- Change ResponseChunkProcessor interface: takes string (framework
  converts once) + InferenceResponse (for header mutation) (comment 5)
- Add nil profile check and empty processor check in HandleResponseChunk
  for bodiless requests like GET /v1/models (comment 6)
- Extract runResponseChunkProcessors and buildStreamedChunkResponse
  following the same patterns as the body processing path

Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
In FULL_DUPLEX_STREAMED mode, envoy needs the deferred HeadersResponse
before it forwards response body chunks to the client. Previously,
buildStreamedChunkResponse only sent HeadersResponse on endOfStream,
which caused empty response bodies when EoS was delayed or missing.

Track ResponseHeadersSent on RequestContext and send HeadersResponse
with the first chunk instead.

Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
Comment thread pkg/handlers/response.go

// HandleResponseChunk runs ResponseChunkProcessors on a single response body chunk
// and wraps the result in the ext_proc streaming response format.
func (s *Server) HandleResponseChunk(ctx context.Context, reqCtx *RequestContext, chunkBytes []byte, endOfStream bool) ([]*eppb.ProcessingResponse, error) {

@nirrozenbaum nirrozenbaum Jun 25, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something is still wrong in this function..
chunk processor plugins cannot mutate the chunk content, and even if they do it's ignored in L151.
also, I've noticed that BodyMutation is always used even if chunk hasn't changed, which is an expensive operation in envoy..

we should also probably distinguish between the case when chunk was mutated and the case when it was passed through.
we can improve that last point on a follow up PR, leaving mutation always works for now.
but the first point is functionality wise wrong.

noyitz added 2 commits June 25, 2026 13:56
- Add CurrentChunk/SetChunk/ChunkMutated to InferenceResponse so chunk
  processors can read and mutate chunk content through the response
  object rather than a pass-by-value string parameter.
- HandleResponseChunk sets ResetChunkState before plugins run and uses
  the (potentially mutated) chunk in buildStreamedChunkResponse.
- Move TTFT metric recording after the if/else block so it fires for
  both buffered and streaming response paths. Change continue to break
  in the buffering accumulation branch so the metric code is reachable.

Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
- runResponseChunkProcessors passes response.CurrentChunk to each
  plugin so mutations from earlier plugins are visible to later ones.
- Add unit tests for SetChunk, ChunkMutated, ResetChunkState.

Signed-off-by: Noy Itzikowitz <nitzikow@redhat.com>
@noyitz

noyitz commented Jun 26, 2026

Copy link
Copy Markdown
Contributor Author

Addressed the latest comments in two commits:

63991fd — chunk mutation via InferenceResponse:

  • Added CurrentChunk, SetChunk(), ChunkMutated(), ResetChunkState() on InferenceResponse — plugins read/mutate the chunk through the response object, not the pass-by-value string parameter.
  • HandleResponseChunk calls ResetChunkState(chunk) before plugins run, then checks ChunkMutated() and uses the modified version in buildStreamedChunkResponse.
  • runResponseChunkProcessors passes response.CurrentChunk to each plugin so mutations chain across the pipeline.

63991fd — TTFT metric fix:

  • Changed continue to break in the buffering accumulation branch so the TTFT metric code after the if/else block is reachable for both buffered and streaming paths.

d793ccb — tests:

  • TestChunkMutation — verifies SetChunk marks mutated, ResetChunkState sets chunk without marking mutated.
  • TestChunkMutation_ResetClearsMutatedFlag — verifies ResetChunkState clears the mutated flag between chunks.

Also fixed buildStreamedChunkResponse to send HeadersResponse with the first chunk (not EoS) — envoy needs it before accepting body responses in FULL_DUPLEX_STREAMED mode.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size/L Denotes a PR that changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants